Java Concurrent - ThreadPoolExecutor特性&实现

1. 为什么要使用线程池?

这点在注释最开始说的很清楚:

Thread pools address two different problems

  1. they usually provide improved performance when executing large numbers of asynchronous tasks, due to reduced per-task invocation overhead,
  2. and they provide a means of bounding and managing the resources, including threads, consumed when executing a collection of tasks.

其中第一点也可以这样简单描述:

Reusing threads that have already been created instead of creating new ones (an expensive process)
Answer from Thread vs ThreadPool on Stackoverflow

2. Worst pool

在一切开始之前, 先思考一个问题: 仅就”重用线程”这个首要目标来说, 该如何实现一个线程池? 比较明确的几点是:

1. 一定创建固定数目的`Thread`, 任务由`Runnable`形式交给`Thread`执行.
2. 根据线程的生命周期, 一个线程执行完成即进入 *Dead* 状态, 由此可知`Thread`不能停, 需要一直维持 *Running* 状态. 也就是说, 需要在其中执行一个”死循环”.

于是最开始想到是这样的写法:

1
2
3
4
5
public class PoolThread {
private Thread thread;
private Runnable task;
...
}

这样实现需要在每个新任务到来时遍历PoolThread, 判断其工作状态从而决定是否将任务提交给他. 而且弹性比较差, 当所有线程空闲时, 新任务无法处理.
这里提前剧透, 看了ThreadPoolExecutor的实现, 是引入了一个生产-消费模型, Runnable直接进入队列(实际上有的并不是, 后面再提), 而Thread从队列中消费.
有了这些信息, 已经可以写出一个辣鸡队列了, 姑且称之为WorstPool. 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import com.google.common.collect.Queues;

/**
* @author shibinfei
*/
public class WorstPool {

private BlockingQueue<Runnable> taskQueue = Queues.newArrayBlockingQueue(5);

public WorstPool(int poolSize) {
IntStream.range(0, poolSize).forEach(x -> createThreadAndRun());
}

private void createThreadAndRun() {
Thread thread = new Thread(() -> {
while (true) {
Runnable cmd;
try {
cmd = taskQueue.take(); // keep waiting
} catch (InterruptedException e) {
continue; // ...
}

cmd.run();
}
});
thread.start();
}

public void execute(Runnable cmd) {
taskQueue.add(cmd); // 满了就抛出异常
}
}

调用代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void main(String[] args) {
WorstPool pool = new WorstPool(2);

for (int i = 0; i < 6; i++) {
final int id = i;
pool.execute(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " DONE WITH " + id);
});
}

}

可以运行下看看结果~

3. ThreadPoolExecutor

ThreadPoolExecutorWorstPool的主要模型是一致的. 了解后者之后, ThreadPoolExecutor就比较容易切入. 先尝试将WorstPool的成员对应到ThreadPoolExecutor. 之后再根据ThreadPoolExecutor的各种特性逐个了解.

3.1 找相同 - Worker & BlockingQueue

观察ThreadPoolExecutor的类成员, 可以很容易找到和WorstPool的对应.

- `BlockingQueue<Runnable> workQueue` - `taskQueue` in `WorstPool`
- 包含一个`Thread`成员的内部类`Worker` - `WorstPool`中没有做封装, 直接是一个`Thread`

3.1.1 Worker

Worker成员较少, 可以先关注下面几个

- `Thread thread`, 用来执行任务的线程
- `Runnable task`,  在创建`Worker`会同时为其指定一个任务. 后续的任务将从队列中获取.

ThreadPoolExecutor中, Worker不是在线程池初始化时创建的. 而是在提交任务时创建的, 即在客户端调用入口execute(Runnable cmd)方法中. Worker的创建以及执行任务流程大致如下:

addWorker方法中, Worker实例会被加入到HashSet<Worker> workers. 用于线程池管理所有的Worker.

Worker数量

Worker数量可能不是固定的, 在execute()中, 判断是否需要新建worker主要看corePoolSizemaximumPoolSize. 注释讲的很清楚:

A ThreadPoolExecutor will automatically adjust the pool size according to the bounds set by corePoolSize and maximumPoolSize.
When a new task is submitted in method execute, and fewer than corePoolSize threads are running, a new thread is created to handle the request, even if other worker threads are idle. If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full.

AbstractQueuedSynchronizer

Worker本身继承了AbstractQueuedSynchronizer, 相关方法有lock(), tryLock(),isLocked(),unlock(). 等后续单独介绍AbstractQueuedSynchronizer, 暂时可以简单理解为, 内部维护了一个ReentrantLock实例. 具体使用后面会提到~

Key Words

Worker workQueue corePoolSize maximumPoolSize AbstractQueuedSynchronizer thread workers

3.2 ctl

ThreadPoolExecutor中有一个乍看起来有些令人困惑的成员 - AtomicInteger ctl

The main pool control state, ctl, is an atomic integer packing two conceptual fields: workerCount, runState.
实际上一些相关的位操作只是为了将两个变量封装到一个中: 高三位表示runState. 剩下的表示workCount

WorkerCount

workerCount可以先简单地根据字面理解为Worker数量, 实际上却不大精准:

The workerCount is the number of workers that have been permitted to start and not permitted to stop.
// TODO

workerCountOf, ctlOf等几个方法都是很简单的二进制操作, 下面详细说明.

Bit操作详细说明

这里必须上源码了:

1
2
3
4
5
6
int CAPACITY   = (1 << (Integer.SIZE - 3)) - 1;

// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

CAPACITY1左移29位. 它的二进制表示为:
0001 1111 1111 1111 1111 1111 1111 1111
~CAPACITY
1110 0000 0000 0000 0000 0000 0000 0000

这样看就很好理解了. 两者一个是高位为1, 一个是低位为1, 这样在按位与操作时就会忽略其他为0的Bit, 达到封装两个值的效果. 可以看出workerCount的长度受限于29个bit, 最大为 (2 ^ 29) - 1. 而runState只有6种可能取值, 3位也够了.

KeyWords

ctl workerCount runState

3.3 生命周期

上文提到 ctl包装的另一个值就是runState. 它的作用是用来表示整个线程池的生命周期状态, 取值有如下几种:

The runState provides the main lifecyle control, taking on values:

   - RUNNING:  Accept new tasks and process queued tasks
- SHUTDOWN: Don't accept new tasks, but process queued tasks
- STOP:     Don't accept new tasks, don't process queued tasks, and interrupt in-progress tasks
- TIDYING:  All tasks have terminated, workerCount is zero, the thread transitioning to state      TIDYING will run the terminated() hook method
- TERMINATED: terminated() has completed

下面几个方法都是对runState简单的读写操作, 几乎可以根据名称判断用处

  • runStateLessThan(int c, int s)
  • runStateAtLeast(int c, int s)
  • isTerminated()
  • isTerminating()
  • isShutdown()
  • isRunning(int c) // 是否处于RUNNING状态
  • advanceRunState(int target) // 将runState设置为目标值

各种状态的转换说明如下:

The runState monotonically increases over time, but need not hit each state. The transitions are:

- RUNNING -> SHUTDOWN: On invocation of shutdown(), perhaps implicitly in finalize()
- (RUNNING or SHUTDOWN) -> STOP: On invocation of shutdownNow()
* SHUTDOWN -> TIDYING:  When both queue and pool are empty
* STOP -> TIDYING:  When pool is empty
    * TIDYING -> TERMINATED: When the terminated() hook method has completed

可以看出来, 线程池初始化之后, 如果不调用shutdown, shutdownNow它是一直处于RUNNING状态的, 所以 生命周期的变化都始于这两个方法; 他们的作用都是试图停止线程池, 但是细节有所不同.

showdown

此操作调用之前提交的任务(即包含队列中的任务)都会被执行完, 但是不再接受新任务. 另外此方法的注释中提到: (shutdownNow也是如此)

This method does not wait for previously submitted tasks to complete execution. Use awaitTermination awaitTermination
这句的意思是, shutdown并不会阻塞当前线程, 从而等待所有任务执行完. 如果需要的话, 使用awaitTermination. 下面的代码可以说明这一点:

1
2
3
4
5
6
7
8
9
10
11
12
13
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 1, TimeUnit.SECONDS, Queues.newArrayBlockingQueue(1));
pool.execute(() -> {
try {
System.out.println(Thread.currentThread().getName() + " in");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " out");
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " interrupted");
}
});

pool.shutdown();
System.out.println("ShutDown OK");

这里的”ShutDown OK”马上就打印了, 但是线程池中的任务还没有完成.

awaitTermination的实现是通过一个Condition termination成员的await来实现的, 逻辑比较简单, 其中根据runState是否达到TERMINATED状态决定是否继续await. 通常有await就会有signal, 在后面会提到.

shutdownNow

此方法试图终止当前运行的任务, 并将队列中的任务全部移除. 由于试图终止的方式为interrupt, 所以实际上并不能保证一定成功.

shutdown & shutdownNow

来看一下两个方法的主要内容:

  1. advanceRunState(state)
    这里两个都有调用, 只不过目标值不同. 逻辑也仅仅为修改状态而已.
  2. interruptIdleWorkers vs interruptWorkers:
    两者都是试图对进行中的worker thread进行interrupt. 不同的是, 前者会先调用tryLock(). 而在runWorker的循环中, 每次执行task会先调用worker.lock(), 结束才后unlock. 所以说非空闲的任务不会受到影响.
  3. onShutdown
    留给子类的一个钩子, 学习ScheduledThreadPoolExecutor再关注
  4. drainQueue
    将队列中的元素抽取到另一个List中, 并移除此元素.
  5. tryTerminate
    两者都有调用. 具体说明之前先回顾一下前面的runState转换:
    • SHUTDOWN -> TIDYING: When both queue and pool are empty
    • STOP -> TIDYING: When pool is empty
      这个转换过程即为tryTerminate做的事情.

查看tryTerminate(), 其中值得注意的是, 当为SHUTDOWN状态, 且队列不为空时, tryTerminate方法是直接return的. 而在showdown过程中, 很可能正好处于这种情况, 此时shutdowntryTerminate的调用是无效的. 但是在后续, tryTerminate方法还会被调用一次, 即前面提到的processWorkerExit

所以一个ThreadPoolExecutor的生命周期转换以及触发操作如下:
RUNNING (shutdown) -> SHUTDOWN -> TIDYING(tryTerminate) -> TERMINATED
RUNNING (shutdownNow) -> STOP -> TIDYING(tryTerminate) -> TERMINATED

  1. RuntimePermission

3.4 Reject策略

回想workerCount策略:

  • workerCount小于corePoolSize, 有新任务会会创建Worker.
  • 如果达到了corePoolSize, 会将任务放到队列中.
  • 如果队列放不下了, 会尝试继续创建Worker

还有一点:

  • 如果workerCount即将超过maximumPoolSize, 那么将对对应的task执行Reject策略.

这个策略的抽象即为RejectedExecutionHandler#rejectedExecution(Runnable r, ThreadPoolExecutor executor).

四种预定义策略比较简单:

  1. AbortPolicy(默认). 抛出一个异常(RejectedExecutionException)
  2. CallerRunsPolicy. 将提交的Task直接还给主线程同步执行.
  3. DiscardPolicy. 非常简单, 直接放弃治疗
  4. DiscardOldestPolicy. 抛弃队列头的任务, 重试执行.

3.5 KeepAliveTime & allowCoreThreadTimeOut

通过两个参数corePoolSize, maximumPoolSize来控制Worker数量, 目标为使线程池更具有弹性, 保证一段时间内的任务量骤增也可以承受. 而下面则关于任务量从峰值降下来后, 如何减少线程池Worker数量, 从而减少资源占用.

默认情况下, 当Worker数量超过了corePoolSize之后, 且有Worker空闲了一段时间, 会有部分Worker被回收, 但是数量不会小于corePoolSize.

举个栗子, 假设corePoolSize == 5, maximumPoolSize == 10;. 之前任务很多, 所以创建了10个Worker, 而此时任务被处理完. 对于超过corePoolSize数量的线程, 如果空闲时间超过了keepAliveTime, 则会被回收.

默认情况下, 即便回收也是会保证活跃线程数量 >= corePoolSize的. 如果想打破这里逻辑, 可以设置alloCoreThreadTimeOuttrue.

对于超时时间的控制, 在getTask()中, 且仍然基于阻塞队列的特性:

1
2
3
4
5
6
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// ...
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
timedOut = true;

正常情况下, 使用take()会一直阻塞在这里, 而符合超时判断条件时, 则最多等keepAliveTime纳秒. 没有取到任务则timeOut则被置为true, 下次循环中会return null, 则对应的worker就结束了.(参考前文流程图) .

可以通过以下代码debuggetTask()方法

1
2
3
4
5
6
7
8
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 2, 1, TimeUnit.SECONDS, Queues.newArrayBlockingQueue(1));
pool.setKeepAliveTime(3, TimeUnit.SECONDS);

for (int i = 0; i < 3; i++) {
pool.execute(() -> {
System.out.println(Thread.currentThread().getName());
});
}

前三个任务, 两个直接交给了Worker, 一个从队列中消费, 所以getTask()的第一次调用可以忽略. 可以直接从第二次调用跟踪.

4. 整体概念回顾

5. Best Practice

5. 1 线程池大小应该设置多少合适

先给一个粗糙的结论

取决于程序为CPU密集/IO密集. 如果接近完全为CPU密集的程序, 线程数应设置为CPU内核数量. IO密集则需要增加线程数.
IO密集/CPU密集无法量化, 所以需要通过测试来决定.
// TODO 如何进行测试